package com.tpvlog.dfs.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * NIO Client，负责跟DataNode进行网络通信
 *
 * @author Ressmix
 */
public class DFSNIOClient {

    // 文件上传
    public static final Integer SEND_FILE = 1;
    // 文件下载
    public static final Integer READ_FILE = 2;

    /**
     * 上传文件
     *
     * @param hostname datanode hostname
     * @param nioPort  datanode nio port
     * @param file     file byte array
     * @param fileSize file size
     */
    public static void sendFile(String hostname, int nioPort, byte[] file, String filename, long fileSize) {

        Selector selector = null;
        SocketChannel channel = null;
        ByteBuffer buffer = null;

        try {
            // 1.与建立短连接
            selector = Selector.open();
            channel = SocketChannel.open();
            // 非阻塞模式，调用完connect立即返回
            channel.configureBlocking(false);
            channel.connect(new InetSocketAddress(hostname, nioPort));
            // 关注SocketChannel的OP_CONNECT事件
            channel.register(selector, SelectionKey.OP_CONNECT);

            boolean sending = true;
            while (sending) {
                // 这里是同步调用，线程会同步等待直到SocketChannel有事件发生，所以NIO是一种“同步非阻塞模式”
                selector.select();

                // 2.遍历SelectionKey
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                while (keysIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keysIterator.next();
                    keysIterator.remove();

                    // 3.1 连接建立
                    if (key.isConnectable()) {
                        channel = (SocketChannel) key.channel();
                        // 该channel正在建立TCP连接
                        if (channel.isConnectionPending()) {
                            // 三次握手，直到TCP连接建立完成
                            while (!channel.finishConnect()) {
                                Thread.sleep(100);
                            }
                        }
                        System.out.println("完成与服务端的连接的建立......");
                        key.interestOps(SelectionKey.OP_WRITE);
                    }
                    // 3.2 发送数据到NIO Server
                    else if (key.isWritable()) {
                        if (buffer == null) {
                            // 封装文件请求数据（固定格式：TYPE+文件名大小+文件名+文件大小+文件内容）
                            // requestType | filenameLength | filename | fileSize | file
                            buffer = ByteBuffer.allocate(4 + 4 + filename.getBytes().length + 8 + (int) fileSize);
                            System.out.println("准备发送的数据包大小为：" + buffer.capacity());

                            // 4字节请求类型标识：1-文件上传 2-文件下载
                            buffer.putInt(SEND_FILE);
                            // 4字节文件名大小标识
                            buffer.putInt(filename.getBytes().length);
                            // 文件名内容
                            buffer.put(filename.getBytes());
                            // 8字节文件内容大小标识
                            buffer.putLong(fileSize);
                            // 文件内容
                            buffer.put(file);
                            buffer.rewind();

                            int sent = channel.write(buffer);
                            System.out.println("已经发送了" + sent + " bytes的数据到服务端去");

                            // 这里是对“拆包”问题进行处理”
                            if (buffer.hasRemaining()) {
                                System.out.println("本次数据包没有发送完毕，下次会继续发送.......");
                                key.interestOps(SelectionKey.OP_WRITE);
                            } else {
                                System.out.println("本次数据包发送完毕，准备读取服务端的响应......");
                                buffer = null;
                                key.interestOps(SelectionKey.OP_READ);
                            }
                        } else {
                            channel = (SocketChannel) key.channel();
                            int sent = channel.write(buffer);
                            System.out.println("上一次数据包没有发送完毕，本次继续发送了" + sent + " bytes");
                            if (!buffer.hasRemaining()) {
                                System.out.println("本次数据包没有发送完毕，下次会继续发送.......");
                                key.interestOps(SelectionKey.OP_READ);
                            }
                        }
                    }
                    // 3.3 接收NIOServer响应
                    else if (key.isReadable()) {
                        channel = (SocketChannel) key.channel();
                        buffer = ByteBuffer.allocate(1024);
                        int len = channel.read(buffer);
                        buffer.flip();
                        if (len > 0) {
                            System.out.println("[" + Thread.currentThread().getName()
                                    + "]收到" + hostname + "的响应：" + new String(buffer.array(), 0, len));
                            sending = false;
                        }
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("upload file err");
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 下载文件
     *
     * @param hostname datanode hostname
     * @param nioPort  datanode nio port
     */
    public static byte[] readFile(String hostname, Integer nioPort, String filename) {
        ByteBuffer fileLengthBuffer = null;
        Long fileLength = null;

        ByteBuffer fileBuffer = null;
        byte[] file = null;

        SocketChannel channel = null;
        Selector selector = null;
        try {
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            channel.connect(new InetSocketAddress(hostname, nioPort));
            selector = Selector.open();
            channel.register(selector, SelectionKey.OP_CONNECT);

            boolean reading = true;
            while (reading) {
                selector.select();
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                while (keysIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keysIterator.next();
                    keysIterator.remove();

                    // 1.建立连接
                    if (key.isConnectable()) {
                        channel = (SocketChannel) key.channel();
                        if (channel.isConnectionPending()) {
                            // 三次握手，直到TCP连接建立完成
                            while (!channel.finishConnect()) {
                                Thread.sleep(100);
                            }
                        }
                        System.out.println("完成与服务端的连接的建立......");

                        // 1.1 立即发送一个请求给服务端，告知要下载的文件
                        // requestType | filenameLength | filename | fileSize
                        byte[] filenameBytes = filename.getBytes();
                        ByteBuffer readFileRequest = ByteBuffer.allocate(4 + 4 + filenameBytes.length);
                        readFileRequest.putInt(READ_FILE);
                        readFileRequest.putInt(filenameBytes.length);
                        readFileRequest.put(filenameBytes);
                        readFileRequest.flip();

                        channel.write(readFileRequest);
                        System.out.println("发送文件下载的请求过去......");
                        key.interestOps(SelectionKey.OP_READ);
                    }
                    // 2.读取响应内容
                    else if (key.isReadable()) {
                        channel = (SocketChannel) key.channel();

                        // 处理文件大小的拆包
                        if (fileLength == null) {
                            if (fileLengthBuffer == null) {
                                fileLengthBuffer = ByteBuffer.allocate(8);
                            }
                            channel.read(fileLengthBuffer);
                            if (!fileLengthBuffer.hasRemaining()) {
                                fileLengthBuffer.rewind();
                                fileLength = fileLengthBuffer.getLong();
                                System.out.println("从服务端返回数据中解析文件大小：" + fileLength);
                            }
                        }
                        // 处理文件内容的拆包
                        if (fileLength != null) {
                            if (fileBuffer == null) {
                                fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
                            }
                            int hasRead = channel.read(fileBuffer);
                            System.out.println("从服务端读取了" + hasRead + " bytes的数据出来到内存中");
                            if (!fileBuffer.hasRemaining()) {
                                fileBuffer.rewind();
                                file = fileBuffer.array();
                                System.out.println("最终获取到的文件的大小为" + file.length + " bytes");
                                reading = false;
                            }
                        }
                    }
                }
            }
            return file;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return null;
    }
}
